-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add retries to cagg httpSinkActor #124
base: master
Are you sure you want to change the base?
Conversation
final int attempt) { | ||
final CompletableFuture<Response> promise = new CompletableFuture<>(); | ||
_client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); | ||
promise.handle((result, err) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're probably going to need a bit of a backoff here. I would recommend using an exponential backoff strategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least adding some support for backoff would be nice. But I'm also okay with adding that in a second step.
_client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); | ||
promise.handle((result, err) -> { | ||
if (err == null && ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) { | ||
try (Metrics metrics = _metricsFactory.create()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new Metrics object for each POST is going to be quite expensive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's not ideal; unfortunately, there is no end-to-end scoped instrumentation in CAGG. Not to mention the event loop here is async and so decoupled. If CAGG supports periodic metrics, we could use that, but this is not unreasonable (there's an MF in the Router so for each incoming request; Ref).
@@ -143,7 +145,7 @@ public void testPost() throws InterruptedException, IOException { | |||
Mockito.any()); | |||
Mockito.verify(_mockMetrics, Mockito.times(1)).startTimer("sinks/http_post/kairosdb_sink_test/request_latency"); | |||
Mockito.verify(_mockMetrics, Mockito.times(1)).stopTimer("sinks/http_post/kairosdb_sink_test/request_latency"); | |||
Mockito.verify(_mockMetrics, Mockito.times(1)).close(); | |||
Mockito.verify(_mockMetrics, Mockito.times(2)).close(); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you're missing a test case for the feature you're trying to add. Can you please add a test showing the retry on a failed POST?
@@ -306,6 +323,31 @@ private void fireNextRequest() { | |||
PatternsCS.pipe(responsePromise, context().dispatcher()).to(self()); | |||
} | |||
|
|||
private HttpResponse sendHttpRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a base case
private HttpResponse sendHttpRequest(
final Request request
) {
return this.sendHttpRequest(request, 0)
}
.willReturn(WireMock.aResponse() | ||
.withStatus(404))); | ||
_kairosDbSinkBuilder.setMaxRetries(1).build().recordAggregateData(TestBeanFactory.createPeriodicData()); | ||
Thread.sleep(3000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I recognize there's already a Thread.sleep(1000) in this test case, but - why 3000?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I added exponential backoff for the retry, which means, after initial HTTPrequest, the thread will wait 1000 milliseconds before sending the next attempt. So if we use 1000 here this test will fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleeps are a really bad way to deal with this and will cause brittle tests. Waiting for an asyc completion with a timeout is probably a better way to go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks pretty good
final Request request, | ||
final int attempt) { | ||
try { | ||
Thread.sleep(Double.valueOf(Math.pow(2, attempt) - 1).longValue() * 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't do a thread sleep here. It can block too much. The rule is basically not to block in an asyc method.
Also, this isn't proper exponential backoff. Please take a look at https://en.m.wikipedia.org/wiki/Exponential_backoff
Specifically the part about randomization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By saying "block too much", do you mean 1) it can block more than the time set or 2)it occupies too much resources because it is a busy wait?
I found a solution here using ScheduledExecutorService
which can solve 1).
And we can probably use wait() for a non busy wait if the problem is 2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to understand how this code is executing. This code is in an Akka actor, which means it is running on a thread pool shared with other actors. There is a limited amount of threads. In an actor, you should not do anything that blocks the thread (in most cases, there can be exceptions if you are very careful and configure things explicitly). But this also gives us access to Akka timers where you can schedule execution in a non blocking way. Please take a look at the following documentation.
https://doc.akka.io/docs/akka/current/actors.html#introduction
https://doc.akka.io/docs/akka/current/dispatchers.html
https://doc.akka.io/docs/akka/current/actors.html#actors-timers
.willReturn(WireMock.aResponse() | ||
.withStatus(404))); | ||
_kairosDbSinkBuilder.setMaxRetries(1).build().recordAggregateData(TestBeanFactory.createPeriodicData()); | ||
Thread.sleep(3000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleeps are a really bad way to deal with this and will cause brittle tests. Waiting for an asyc completion with a timeout is probably a better way to go
No description provided.